/*
 * Copyright (C) 2025 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;

import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.conditions.ChainedConditionCheck;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck;
import org.apache.beam.it.gcp.spanner.matchers.SpannerAsserts;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
 * An integration test for {@link DataStreamToSpanner} Flex template where the shadow tables are in
 * a different database than the target tables with focus on PK type coverage and updates. This
 * tests handling of INSERT, UPDATE AND DELETE events being generated by Datastream in response to
 * source database changes.
 */
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
@TemplateIntegrationTest(DataStreamToSpanner.class)
@RunWith(JUnit4.class)
public class SeparateShadowTableDatabasePKFocusedIT extends DataStreamToSpannerITBase {
  private static final String SPANNER_DDL_RESOURCE =
      "SeparateShadowTableDatabaseIT/spanner-schema.sql";

  private static HashSet<SeparateShadowTableDatabasePKFocusedIT> testInstances = new HashSet<>();
  private static PipelineLauncher.LaunchInfo jobInfo;

  public static PubsubResourceManager pubsubResourceManager;
  public static SpannerResourceManager spannerResourceManager;
  public static SpannerResourceManager shadowSpannerResourceManager;
  public static GcsResourceManager gcsResourceManager;

  /**
   * Setup resource managers and Launch dataflow job once during the execution of this test class.
   *
   * @throws IOException
   */
  @Before
  public void setUp() throws IOException {
    // Prevent cleaning up of dataflow job after a test method is executed.
    skipBaseCleanup = true;
    synchronized (SeparateShadowTableDatabasePKFocusedIT.class) {
      testInstances.add(this);
      if (jobInfo == null) {
        spannerResourceManager = setUpSpannerResourceManager();
        shadowSpannerResourceManager = setUpShadowSpannerResourceManager();
        pubsubResourceManager = setUpPubSubResourceManager();
        gcsResourceManager = setUpSpannerITGcsResourceManager();
        createSpannerDDL(spannerResourceManager, SPANNER_DDL_RESOURCE);
        jobInfo =
            launchDataflowJob(
                getClass().getSimpleName(),
                null,
                null,
                "SeparateShadowTableDatabasePKFocusedIT",
                spannerResourceManager,
                pubsubResourceManager,
                new HashMap<>() {
                  {
                    put(
                        "shadowTableSpannerInstanceId",
                        shadowSpannerResourceManager.getInstanceId());
                    put(
                        "shadowTableSpannerDatabaseId",
                        shadowSpannerResourceManager.getDatabaseId());
                  }
                },
                null,
                null,
                gcsResourceManager);
      }
    }
  }

  /**
   * Cleanup dataflow job and all the resources and resource managers.
   *
   * @throws IOException
   */
  @AfterClass
  public static void cleanUp() throws IOException {
    for (SeparateShadowTableDatabasePKFocusedIT instance : testInstances) {
      instance.tearDownBase();
    }
    ResourceManagerUtils.cleanResources(
        spannerResourceManager,
        shadowSpannerResourceManager,
        pubsubResourceManager,
        gcsResourceManager);
  }

  /**
   * This test works on a simple schema of 2 columns: id INT (PK), and val int. It tests INSERT,
   * DELETE, UPDATE of val (non-pk) and update of id (PK, datastream generates an INSERT and DELETE
   * event for PK updates) .
   */
  @Test
  public void migrationTestSimpleTable() {
    String table = "my_table";

    // Construct a ChainedConditionCheck with 4 stages.
    // 1. Send 3 INSERTs.
    // 2. Wait on Spanner to have 3 events
    // 3. Send an UPDATE val and a DELETE.
    // 4. Wait on Spanner to have 2 events.
    // 5. Send INSERT and DELETE (PK UPDATE).
    // 6. Wait on Spanner to have 2 events.
    ChainedConditionCheck conditionCheck =
        ChainedConditionCheck.builder(
                List.of(
                    uploadDataStreamFile(
                        jobInfo,
                        table,
                        "my_table-simpleTest-3-inserts.avro",
                        "SeparateShadowTableDatabaseIT/my_table-simpleTest-3-inserts.avro",
                        gcsResourceManager),
                    SpannerRowsCheck.builder(spannerResourceManager, table)
                        .setMinRows(3)
                        .setMaxRows(3)
                        .build(),
                    uploadDataStreamFile(
                        jobInfo,
                        table,
                        "my_table-simpleTest-update-delete.avro",
                        "SeparateShadowTableDatabaseIT/my_table-simpleTest-update-delete.avro",
                        gcsResourceManager),
                    SpannerRowsCheck.builder(spannerResourceManager, table)
                        .setMinRows(2)
                        .setMaxRows(2)
                        .build(),
                    uploadDataStreamFile(
                        jobInfo,
                        table,
                        "my_table-simpleTest-pk-update.avro",
                        "SeparateShadowTableDatabaseIT/my_table-simpleTest-pk-update.avro",
                        gcsResourceManager),
                    SpannerRowsCheck.builder(spannerResourceManager, table)
                        .setMinRows(2)
                        .setMaxRows(2)
                        .build()))
            .build();

    // Wait for conditions
    PipelineOperator.Result result =
        pipelineOperator()
            .waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck);

    // Assert Conditions
    assertThatResult(result).meetsConditions();

    // Sleep for cutover time to wait till all CDCs propagate.
    // A real world customer also has a small cut over time to reach consistency.
    try {
      Thread.sleep(CUTOVER_MILLIS);
    } catch (InterruptedException e) {
    }
    // Assert specific rows
    assertMyTableContents();
  }

  /**
   * This test works on a schema having PK consisting of all possible data types. It tests INSERT,
   * DELETE, UPDATE of val (non-pk) and update of PK column (datastream generates an INSERT and
   * DELETE event for PK updates) .
   *
   * <p>TODO: Add JSON type to PK once lock hint via read API is supported.
   */
  @Test
  public void migrationTestAllDataType() {
    String table = "alltypes";

    // Construct a ChainedConditionCheck with 4 stages.
    // 1. Send 3 INSERTs.
    // 2. Wait on Spanner to have 3 events
    // 3. Send an UPDATE val and a DELETE.
    // 4. Wait on Spanner to have 2 events.
    // 5. Send INSERT and DELETE (PK UPDATE).
    // 6. Wait on Spanner to have 2 events.
    ChainedConditionCheck conditionCheck =
        ChainedConditionCheck.builder(
                List.of(
                    uploadDataStreamFile(
                        jobInfo,
                        table,
                        "alltypes-test-3-inserts.avro",
                        "SeparateShadowTableDatabaseIT/alltypes-test-3-inserts.avro",
                        gcsResourceManager),
                    SpannerRowsCheck.builder(spannerResourceManager, table)
                        .setMinRows(3)
                        .setMaxRows(3)
                        .build(),
                    uploadDataStreamFile(
                        jobInfo,
                        table,
                        "alltypes-test-update-delete.avro",
                        "SeparateShadowTableDatabaseIT/alltypes-test-update-delete.avro",
                        gcsResourceManager),
                    SpannerRowsCheck.builder(spannerResourceManager, table)
                        .setMinRows(2)
                        .setMaxRows(2)
                        .build(),
                    uploadDataStreamFile(
                        jobInfo,
                        table,
                        "alltypes-test-pk-update.avro",
                        "SeparateShadowTableDatabaseIT/alltypes-test-pk-update.avro",
                        gcsResourceManager),
                    SpannerRowsCheck.builder(spannerResourceManager, table)
                        .setMinRows(2)
                        .setMaxRows(2)
                        .build()))
            .build();

    // Wait for conditions
    PipelineOperator.Result result =
        pipelineOperator()
            .waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck);

    // Assert Conditions
    assertThatResult(result).meetsConditions();

    // Sleep for cutover time to wait till all CDCs propagate
    try {
      Thread.sleep(CUTOVER_MILLIS);
    } catch (InterruptedException e) {
      // Ignore interruption as this is just a safety delay
    }

    // Assert specific rows
    assertAllTypesContents();
  }

  private void assertMyTableContents() {
    List<Map<String, Object>> events = new ArrayList<>();

    Map<String, Object> row1 = new HashMap<>();
    row1.put("id", 2);
    row1.put("val", 20);

    Map<String, Object> row2 = new HashMap<>();
    row2.put("id", 10);
    row2.put("val", 10);
    events.add(row1);
    events.add(row2);

    SpannerAsserts.assertThatStructs(spannerResourceManager.runQuery("select * from my_table"))
        .hasRecordsUnorderedCaseInsensitiveColumns(events);
  }

  private void assertAllTypesContents() {
    List<Map<String, Object>> expectedRecords = new ArrayList<>();
    // First record
    Map<String, Object> record1 = new HashMap<>();
    record1.put("bool_field", true);
    record1.put("int64_field", 2L);
    record1.put("float64_field", 3.14159265359);
    record1.put("string_field", "This is a test string for MySQL.");
    record1.put("bytes_field", "VGhpcyBpcyBzb21lIGJpbmFyeSBkYXRh");
    record1.put("timestamp_field", "2024-12-20T10:30:00Z");
    record1.put("date_field", "2024-12-20");
    record1.put("numeric_field", "12345.1234");
    record1.put("val", 20);

    // Second record
    Map<String, Object> record2 = new HashMap<>();
    record2.put("bool_field", true);
    record2.put("int64_field", 10L);
    record2.put("float64_field", 3.14159265359);
    record2.put("string_field", "This is a test string for MySQL.");
    record2.put("bytes_field", "VGhpcyBpcyBzb21lIGJpbmFyeSBkYXRh");
    record2.put("timestamp_field", "2024-12-20T10:30:00Z");
    record2.put("date_field", "2024-12-20");
    record2.put("numeric_field", "12345.1234");
    record2.put("val", 10);

    expectedRecords.add(record1);
    expectedRecords.add(record2);

    SpannerAsserts.assertThatStructs(spannerResourceManager.runQuery("SELECT * FROM alltypes"))
        .hasRecordsUnorderedCaseInsensitiveColumns(expectedRecords);
  }
}
